from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
import findspark
findspark.init()
from pyspark.streaming.kafka import KafkaUtils


sc=SparkContext(appName='KafkaWordCount')
sc.setLogLevel("WARN")
ssc=StreamingContext(sc,2)
brokers='192.168.118.131:9092'
topic='test'
kafkaStreams = KafkaUtils.createStream(ssc, [topic], "ordertest", {"setadate.broker.list":brokers})
lines = kafkaStreams.map(lambda x:x[1])
count = lines.flatMap(lambda line:line.split(" "))\
    .map(lambda word:(word, 1))\
    .reduceByKey(lambda a,b:a+b)

count.pprint()
ssc.start()
ssc.awaitTermination()


